#include "Session.h"
#include "SocketServer.h"
#include "ClientConnection.h"
#include "Json.h"
#include <signal.h>

static Session* _instance = NULL;
Session::Session(int port)
{
    m_pSocketServer = new SocketServer(port);
    m_bExit = true;
    _instance = this;
}

Session::~Session()
{
    m_bExit = true;
	if (m_pSocketServer) {
		delete m_pSocketServer;
		m_pSocketServer = NULL;
	}
}

bool Session::init()
{
    m_pSocketServer->register_server_observer(this);
    m_pSocketServer->start();
    return true;
}

void Session::run()
{
    m_bExit = false;
    while (!m_bExit) {
        process_heartbeat();
		utils_sleep(2000);
    }
    m_pSocketServer->stop();
}

void Session::process_heartbeat()
{
	if (m_vecUserName.empty()) {
		return;
	}
	LOG_DEBUG("process_heartbeat");
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		(*it)->process_heartbeat();
	}
}

void Session::process_publish(struct lws *wsi)
{
	LOG_DEBUG("process_publish");
    alarm(0);
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		(*it)->process_publish(wsi);
		if ((*it)->is_publisher()) {
			m_strPublisher = (*it)->get_user_name();
		}
	}
}

void Session::process_video_data(uint8_t* data, int len, struct lws *wsi)
{
	LOG_DEBUG("process_video_data len=%d", len);
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		if (!((*it)->is_publisher())) {
			(*it)->process_video_data(data, len, wsi);
		}
	}
}

void Session::process_picture_data(uint8_t* data, int len)
{
	LOG_DEBUG("process_picture_data len=%d", len);
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		if (!((*it)->is_publisher())) {
			(*it)->process_picture_data(data, len);
		}
	}
}

void Session::process_video_ack(uint8_t* data, int len)
{
	LOG_DEBUG("process_video_ack len=%d", len);
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		if ((*it)->is_publisher()) {
			(*it)->process_video_ack(data, len);
		}
	}
}

void Session::process_request_key_frame(uint8_t* data, int len)
{
	LOG_DEBUG("process_request_key_frame len=%d", len);
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		if ((*it)->is_publisher()) {
			(*it)->process_request_key_frame(data, len);
		}
	}
}

void Session::process_operate(struct lws *wsi)
{
	LOG_DEBUG("process_operate");
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		(*it)->process_operate(wsi);
		if ((*it)->is_operater()) {
			m_strOperater = (*it)->get_user_name();
		}
	}
}

void Session::process_input_event(uint8_t* data, int len)
{
	LOG_DEBUG("process_input_event len=%d", len);
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		if (!((*it)->is_operater())) {
			(*it)->process_input_event(data, len);
		}
	}
}

void Session::process_stop_stream(struct lws *wsi)
{
	LOG_DEBUG("process_stop_stream");
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		if ((*it)->is_publisher()) {
			m_strPublisher.clear();
		}
		(*it)->process_stop_stream(wsi);
	}
}

void Session::process_cursor_shape(uint8_t* data, int len)
{
	LOG_DEBUG("process_cursor_shape");
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		if (!((*it)->is_publisher()) && (*it)->is_operater()) {
			(*it)->process_cursor_shape(data, len);
		}
	}
}

void Session::process_audio_data(uint8_t* data, int len)
{
	LOG_DEBUG("process_audio_data");
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		if (!((*it)->is_publisher())) {
			(*it)->process_audio_data(data, len);
		}
	}
}

void Session::process_stream_only(struct lws *wsi)
{
	LOG_DEBUG("process_stream_only");
	alarm(0);
}

string Session::get_heartbeat_info()
{
    Json::FastWriter writer;
    Json::Value root;
    root["UserNum"] = (int)m_vecUserName.size();
    for (int i = 0; i < (int)m_vecUserName.size(); i++) {
        root["UserList"][i] = m_vecUserName[i];
    }
    root["Publisher"] = m_strPublisher;
    root["Operater"] = m_strOperater;
    return writer.write(root);
}

void Session::set_buffer_clear(bool bClear)
{
	for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
	{
		if (!((*it)->is_publisher())) {
			(*it)->set_buffer_clear(bClear);
		}
	}
}

void Session::on_connection_closed(struct lws *wsi)
{
    LOG_DEBUG("on_connection_closed. wsi %p", wsi);
    int i = 0;
    bool bFound = false;
    m_vecUserName.clear();
    vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin();
    while (it != m_pClientConnectionVector.end()) {
        if ((*it)->get_websocket_handle() == wsi) {
            bFound = true;
            if ((*it)->is_publisher()) {
                signal(SIGALRM, process_exit);
                alarm(30);
            }
            delete *it;
            it = m_pClientConnectionVector.erase(it);
            continue;
        }
        m_vecUserName.push_back((*it)->get_user_name());
        it++;
        i++;
    }
    if (!bFound) {
        signal(SIGALRM, stream_quit);
        alarm(3);
    }
}

void Session::on_connection_opened(string& user_name, struct lws *wsi)
{
    LOG_DEBUG("on_connection_opened");
    int i = 0;
    ClientConnection* pConnection = new ClientConnection(user_name, wsi, this);
    m_pClientConnectionVector.push_back(pConnection);
    m_vecUserName.clear();
    for (vector<ClientConnection*>::iterator it = m_pClientConnectionVector.begin(); it != m_pClientConnectionVector.end(); it++)
    {
        if ((*it)->is_publisher()) {
            pConnection->process_publish(NULL);
        }
        m_vecUserName.push_back((*it)->get_user_name());
        i++;
    }
}

void Session::on_message(int type, uint8_t* data, int len, struct lws *wsi)
{
    switch (type) {
    case kMsgTypePublish:
		process_publish(wsi);
		break;
    case kMsgTypeVideoData:
		process_video_data(data, len, wsi);
		break;
    case kMsgTypeVideoAck:
		process_video_ack(data, len);
		break;
    case kMsgTypeRequestKeyFrame:
		process_request_key_frame(data, len);
		break;
    case kMsgTypePicture:
		process_picture_data(data, len);
		break;
    case kMsgTypeOperate:
		process_operate(wsi);
		break;
    case kMsgTypeMouseEvent:
    case kMsgTypeKeyboardEvent:
		process_input_event(data, len);
		break;
    case kMsgTypeStopStream:
		process_stop_stream(wsi);
		break;
    case kMsgTypeCursorShape:
		process_cursor_shape(data, len);
		break;
    case kMsgTypeAudioData:
		process_audio_data(data, len);
		break;
    case kMsgTypeStreamOnly:
		process_stream_only(wsi);
		break;
    default:
		LOG_WARN("unknown message type %d", type);
                break;
    }
}

void Session::process_exit(int param)
{
    LOG_WARN("publisher disconnect timeout");
    _instance->m_bExit = true;
}

void Session::stream_quit(int param)
{
    LOG_WARN("notify reciever stream quit");
    for (vector<ClientConnection*>::iterator it = _instance->m_pClientConnectionVector.begin(); it != _instance->m_pClientConnectionVector.end(); it++)
    {
        if (!((*it)->is_publisher())) {
            (*it)->process_stream_quit();
        }
    }
}

